//  Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.
#include "pure_mem/pmemrep.h"

#include "pure_mem/encoded_version_node.h"
#include "pure_mem/uncoded_version_node.h"

namespace rocksdb {

PureMemRep::PureMemRep(const MemTableRep::KeyComparator& compare,
                       Allocator* allocator, const SliceTransform* transform,
                       Logger* info_log, const size_t lookahead)
    : MemTableRep(allocator),
      cmp_(compare),
      art_list_(compare, ART_ROWEX),
      transform_(transform),
      lookahead_(lookahead),
      multi_range_arena_(new MultiRangeArena(this, info_log)),
      info_log_(info_log) {}

PureMemRep::PureMemRep(const MemTableRep::KeyComparator& compare,
                       Allocator* allocator, const SliceTransform* transform,
                       const ImmutableCFOptions& ioptions, const size_t lookahead)
    : MemTableRep(allocator),
      cmp_(compare),
      art_list_(compare, ART_ROWEX),
      transform_(transform),
      lookahead_(lookahead),
      multi_range_arena_(nullptr),
      info_log_(ioptions.info_log) {
  if (ioptions.encode_version_node)
    multi_range_arena_ = new MultiRangeArena(this, info_log_);
}

KeyHandle PureMemRep::AllocatePure(const size_t len, char** buf,
                                   const Slice& key,
                                   void** rangearena) {
  auto *x = new EncodedVersionNode();
  x->SetNext(nullptr);
  x->SetPrev(nullptr);
  *buf = multi_range_arena_->Allocate(len, key, x, rangearena);
  bool ok = x->CASSetKey(nullptr, *buf);
  assert(ok);

  return static_cast<KeyHandle>(x);
}

void PureMemRep::AllocateOK(const Slice& key, const uint32_t encoded_len,
                            KeyHandle handle, void* rangearena) {
  auto node = static_cast<VersionNode*>(handle);
  rocksdb::MultiRangeArena::AllocateOK(key, encoded_len, node->Key(), rangearena);
}

// Get pointer of multi range arena
MultiRangeArena* PureMemRep::GetMultiRangeArenaPtr() { return multi_range_arena_; }

void PureMemRep::Insert(KeyHandle handle) {
  art_list_.Insert(static_cast<char*>(handle));
}

void PureMemRep::AcceptDelete(const char* buf, void* rangearena) {
  MvccKey iter_mk, delete_mk;
  void* nodePush;
  Slice key = GetLengthPrefixedSlice(buf);
  delete_mk.parseKey(key);
  uint64_t delete_seq, iter_seq;
  ValueType delete_t, iter_t;
  UnPackSequenceAndType(delete_mk.seqNumAndType_, &delete_seq, &delete_t);
  size_t len;
  InlineUserKeyIndex<const MemTableRep::KeyComparator&>::Iterator* iter =
      art_list_.GetIterator();
  iter->Seek(buf);
  assert(iter->Valid());
  while (iter->Valid()) {
//    auto node = iter->node();
    Slice curKey = GetLengthPrefixedSlice(iter->key());
//    node->GetMvccKey(iter_mk, curKey);
//    Slice val;
//    node->GetValue(curKey, val);
    Slice val = GetLengthPrefixedSlice(curKey.data() + curKey.size());
    len = VarintLength(curKey.size()) + curKey.size() +
          VarintLength(val.size()) + val.size();
    iter_mk.parseKey(curKey);

    if (iter_mk.userKey_.compare(delete_mk.userKey_) != 0) {
      break;
    }
    UnPackSequenceAndType(iter_mk.seqNumAndType_, &iter_seq, &iter_t);
    if (iter_mk.timestamp_.compare(delete_mk.timestamp_) == 0 &&
        iter_seq < delete_seq) {
      nodePush = iter->PushCurNode();
      multi_range_arena_->Delete(iter_mk.userKey_, len, rangearena, nodePush);
    } else {
      iter->Next();
    }
  }
}

void PureMemRep::AcceptDelete(const VersionNode &vn) {
  auto &versionNode = static_cast<const UncodedVersionNode&>(vn);
  const MvccKey &delete_mk = versionNode.GetMvccKey();
  uint64_t delete_seq = versionNode.GetSequenceNumber();

  InlineUserKeyIndex<const MemTableRep::KeyComparator&>::Iterator* iter =
      art_list_.GetIterator();
  iter->Seek(versionNode);
  assert(iter->Valid());
  while (iter->Valid()) {
    auto *uvn = dynamic_cast<UncodedVersionNode*>(iter->node());
    const MvccKey &iter_mk = uvn->GetMvccKey();

    if (iter_mk.userKey_.compare(delete_mk.userKey_) != 0) {
      break;
    }
    uint64_t iter_seq = uvn->GetSequenceNumber();
    if (iter_mk.timestamp_.compare(delete_mk.timestamp_) == 0 &&
        iter_seq < delete_seq) {
      void *nodePush = iter->PushCurNode();
      rocksdb::DeleteWhileNoRefs::getInstance()->markNodeForDeletion((void*)nodePush, DELETION_TYPE_VERSION);
    } else {
      iter->Next();
    }
  }
}

bool PureMemRep::AcceptSingleDelete(const char* buf) {
  MvccKey iter_mk;
  size_t len;
  uint64_t delete_seq, iter_seq;
  ValueType delete_t, iter_t;
  InlineUserKeyIndex<const MemTableRep::KeyComparator&>::Iterator* iter =
      art_list_.GetIterator();
  iter->Seek(buf);
  assert(iter->Valid());
//  auto node = iter->node();
  Slice curKey = GetLengthPrefixedSlice(iter->key());
//  node->GetMvccKey(iter_mk, curKey);
//  Slice val;
//  node->GetValue(curKey, val);
  Slice val = GetLengthPrefixedSlice(curKey.data() + curKey.size());
  len = VarintLength(curKey.size()) + curKey.size() + VarintLength(val.size()) +
        val.size();
  iter_mk.parseKey(curKey);

  MvccKey delete_mk;
  Slice key = GetLengthPrefixedSlice(buf);
  delete_mk.parseKey(key);
  UnPackSequenceAndType(delete_mk.seqNumAndType_, &delete_seq, &delete_t);
//  node->GetSeqAndType(iter_mk.seqNumAndType_, &iter_seq, &iter_t);
  UnPackSequenceAndType(iter_mk.seqNumAndType_, &iter_seq, &iter_t);
  if(delete_seq == iter_seq)
      return false;
  assert(iter_mk.userKey_.compare(delete_mk.userKey_) == 0
        && iter_mk.timestamp_.compare(delete_mk.timestamp_) == 0);
  void* nodePush = iter->PushCurNode();

  multi_range_arena_->Delete(iter_mk.userKey_, len, nullptr, nodePush);
  return true;
}

bool PureMemRep::AcceptSingleDelete(const VersionNode& vn) {
  auto &versionNode = static_cast<const UncodedVersionNode&>(vn);
  const MvccKey &delete_mk = versionNode.GetMvccKey();
  uint64_t delete_seq = versionNode.GetSequenceNumber();

  InlineUserKeyIndex<const MemTableRep::KeyComparator&>::Iterator* iter =
      art_list_.GetIterator();
  iter->Seek(versionNode);
  assert(iter->Valid());
  auto *uvn = static_cast<UncodedVersionNode*>(iter->node());
  const Slice &val = uvn->GetValue();
  const MvccKey &iter_mk = uvn->GetMvccKey();
  uint64_t iter_seq = uvn->GetSequenceNumber();

  if(delete_seq == iter_seq)
    return false;
  assert(iter_mk.userKey_.compare(delete_mk.userKey_) == 0
         && iter_mk.timestamp_.compare(delete_mk.timestamp_) == 0);
  void* nodePush = iter->PushCurNode();

  rocksdb::DeleteWhileNoRefs::getInstance()->markNodeForDeletion((void*)nodePush, DELETION_TYPE_VERSION);
  return true;
}

void PureMemRep::AcceptRangeDelete(const char* buf) {
  InlineUserKeyIndex<const MemTableRep::KeyComparator&>::Iterator* iter =
      art_list_.GetIterator();
  iter->Seek(buf);
  Slice curKey = GetLengthPrefixedSlice(buf);
  Slice end = GetLengthPrefixedSlice(curKey.data() + curKey.size());
  MvccKey begin;
  begin.parseKey(curKey);

  MvccKey iter_mk;
  void* nodePush, *rangearena = nullptr;
  while (iter->Valid()) {
    Slice curkey = GetLengthPrefixedSlice(iter->key());
//    auto cur_node = iter->node();
//    cur_node->GetMvccKey(iter_mk, curkey);
//    Slice curval;
//    cur_node->GetValue(curkey, curval);
    iter_mk.parseKey(curkey);
    Slice curval = GetLengthPrefixedSlice(curkey.data() + curkey.size());
    size_t len = VarintLength(curkey.size()) + curkey.size() +
                 VarintLength(curval.size()) + curval.size();
    uint64_t iter_seq, del_seq;
    ValueType iter_t, del_t;
    int compp = iter_mk.userKey_.compare(end);
    if (compp > 0) {
      break;
    }
    if (compp == 0 && iter_mk.timestamp_.compare(begin.timestamp_) < 0) {
      break;
    }

//    cur_node->GetSeqAndType(iter_mk.seqNumAndType_, &iter_seq, &iter_t);
    UnPackSequenceAndType(iter_mk.seqNumAndType_, &iter_seq, &iter_t);
    UnPackSequenceAndType(begin.seqNumAndType_, &del_seq, &del_t);
    if (iter_seq > del_seq) {
      iter->Next();
      continue;
    }
    nodePush = iter->PushCurNode();
    rangearena = multi_range_arena_->Delete(iter_mk.userKey_, len, rangearena, nodePush);
  }
}

void PureMemRep::AcceptRangeDelete(const VersionNode& vn) {
  InlineUserKeyIndex<const MemTableRep::KeyComparator&>::Iterator* iter =
      art_list_.GetIterator();
  auto &versionNode = static_cast<const UncodedVersionNode&>(vn);
  iter->Seek(versionNode);
  const Slice &end = versionNode.GetValue();
  const MvccKey &begin = versionNode.GetMvccKey();
  uint64_t del_seq = versionNode.GetSequenceNumber();

  void* nodePush = nullptr;
  while (iter->Valid()) {
    auto *uvn = static_cast<UncodedVersionNode*>(iter->node());
    const MvccKey &iter_mk = uvn->GetMvccKey();
    const Slice &curval = uvn->GetValue();
    int compp = iter_mk.userKey_.compare(end);
    if (compp > 0) {
      break;
    }
    if (compp == 0 && iter_mk.timestamp_.compare(begin.timestamp_) < 0) {
      break;
    }

    uint64_t iter_seq = uvn->GetSequenceNumber();
    if (iter_seq > del_seq) {
      iter->Next();
      continue;
    }
    nodePush = iter->PushCurNode();
    rocksdb::DeleteWhileNoRefs::getInstance()->markNodeForDeletion((void*)nodePush, DELETION_TYPE_VERSION);
  }
}

bool PureMemRep::InsertKey(KeyHandle handle) {
  bool result = art_list_.Insert(static_cast<char*>(handle));
  return result;
}

void PureMemRep::InsertWithHint(KeyHandle handle, void** hint) {
  art_list_.InsertWithHint(static_cast<char*>(handle), hint);
}

bool PureMemRep::InsertKeyWithHint(KeyHandle handle, void** hint) {
  return art_list_.InsertWithHint(static_cast<char*>(handle), hint);
}

void PureMemRep::InsertConcurrently(KeyHandle handle) {
  art_list_.InsertConcurrently(static_cast<char*>(handle));
}

bool PureMemRep::InsertKeyConcurrently(KeyHandle handle) {
  return art_list_.InsertConcurrently(static_cast<char*>(handle));
}

// Returns true iff an entry that compares equal to key is in the list.
bool PureMemRep::Contains(const char* key) const {
  return art_list_.Contains(key);
}

size_t PureMemRep::ApproximateMemoryUsage() {
  // All memory is allocated through allocator; nothing to report here
  return 0;
}

void PureMemRep::Get(const LookupKey& k, void* callback_args,
                     bool (*callback_func)(void* arg, const char* entry)) {
  PureMemRep::HashIterator iter(&art_list_);
  Slice dummy_slice;
  for (iter.Seek(dummy_slice, k.memtable_key().data());
       iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) {
  }

//  std::unique_lock<std::mutex> lk(art_list_.mtx_);
//  auto uMapIter = art_list_.hashMap_.find(k.user_key().ToString());
//  if (uMapIter != art_list_.hashMap_.end()) {
//    VersionNode *retNode = uMapIter->second->getContentList();
//
//    while (retNode == nullptr || this->cmp_(retNode->Key(), k.memtable_key().data()) < 0) { // 比对hlc和seqnum部分的大小。
//      if (retNode == nullptr) {
//        return;
//      } else {
//        retNode = retNode->Next();
//      }
//    }
//    callback_func(callback_args, retNode->Key());
//  }

}

uint64_t PureMemRep::ApproximateNumEntries(const Slice& start_ikey,
                                           const Slice& end_ikey) {
  std::string tmp;
  uint64_t start_count = art_list_.EstimateCount(EncodeKey(&tmp, start_ikey));
  uint64_t end_count = art_list_.EstimateCount(EncodeKey(&tmp, end_ikey));
  return (end_count >= start_count) ? (end_count - start_count) : 0;
}

PureMemRep::~PureMemRep() {
  delete multi_range_arena_;
}

Slice PureMemRep::UserKey(const char* key) const {
  Slice slice = GetLengthPrefixedSlice(key);
  return Slice(slice.data(), slice.size() - 8);
}

MemTableRep* PureMemFactory::CreateMemTableRep(
    const MemTableRep::KeyComparator& compare, Allocator* allocator,
    const SliceTransform* transform, Logger* info_log) {
  return static_cast<MemTableRep* >(new PureMemRep(compare, allocator, transform,
                                                   info_log, lookahead_));
}


}  // namespace rocksdb
